// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements.  See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License.  You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
= Apache Camel Streamer

== Overview

This documentation page focuses on the Apache Camel, which can also be thought of as a universal streamer because it
allows you to consume from any technology or protocol supported by Camel into an Ignite Cache.

image::images/integrations/camel-streamer.png[Camel Streamer]

With this streamer, you can ingest entries straight into an Ignite cache based on:

* Calls received on a Web Service (SOAP or REST), by extracting the body or headers.
* Listening on a TCP or UDP channel for messages.
* The content of files received via FTP or written to the local filesystem.
* Email messages received via POP3 or IMAP.
* A MongoDB tailable cursor.
* An AWS SQS queue.
* And many others.

This streamer supports two modes of ingestion: **direct ingestion** and **mediated ingestion**.

[NOTE]
====
[discrete]
=== The Ignite Camel Component
There is also the https://camel.apache.org/components/latest/ignite-summary.html[camel-ignite, window=_blank] component, if what you are looking is
to interact with Ignite Caches, Compute, Events, Messaging, etc. from within a Camel route.
====

== Maven Dependency

To make use of the `ignite-camel-ext` streamer, you need to add the following dependency:

[tabs]
--
tab:pom.xml[]
[source,xml]
----
<dependency>
    <groupId>org.apache.ignite</groupId>
    <artifactId>ignite-camel-ext</artifactId>
    <version>${ignite-camel-ext.version}</version>
</dependency>
----
--

It will also pull in `camel-core` as a transitive dependency.

== Direct Ingestion

Direct Ingestion allows you to consume from any Camel endpoint straight into Ignite, with the help of a
Tuple Extractor. We call this **direct ingestion**.

Here is a code sample:
[tabs]
--
tab:Java[]
[source,java]
----
// Start Apache Ignite.
Ignite ignite = Ignition.start();

// Create an streamer pipe which ingests into the 'mycache' cache.
IgniteDataStreamer<String, String> pipe = ignite.dataStreamer("mycache");

// Create a Camel streamer and connect it.
CamelStreamer<String, String> streamer = new CamelStreamer<>();
streamer.setIgnite(ignite);
streamer.setStreamer(pipe);

// This endpoint starts a Jetty server and consumes from all network interfaces on port 8080 and context path /ignite.
streamer.setEndpointUri("jetty:http://0.0.0.0:8080/ignite?httpMethodRestrict=POST");

// This is the tuple extractor. We'll assume each message contains only one tuple.
// If your message contains multiple tuples, use a StreamMultipleTupleExtractor.
// The Tuple Extractor receives the Camel Exchange and returns a Map.Entry<?,?> with the key and value.
streamer.setSingleTupleExtractor(new StreamSingleTupleExtractor<Exchange, String, String>() {
    @Override public Map.Entry<String, String> extract(Exchange exchange) {
        String stationId = exchange.getIn().getHeader("X-StationId", String.class);
        String temperature = exchange.getIn().getBody(String.class);
        return new GridMapEntry<>(stationId, temperature);
    }
});

// Start the streamer.
streamer.start();
----
--

== Mediated Ingestion

For more sophisticated scenarios, you can also create a Camel route that performs complex processing on incoming messages, e.g. transformations, validations, splitting, aggregating, idempotency, resequencing, enrichment, etc. and **ingest only the result into the Ignite cache**. 

We call this **mediated ingestion**.

[tabs]
--
tab:Java[]
[source,java]
----
// Create a CamelContext with a custom route that will:
//  (1) consume from our Jetty endpoint.
//  (2) transform incoming JSON into a Java object with Jackson.
//  (3) uses JSR 303 Bean Validation to validate the object.
//  (4) dispatches to the direct:ignite.ingest endpoint, where the streamer is consuming from.
CamelContext context = new DefaultCamelContext();
context.addRoutes(new RouteBuilder() {
    @Override
    public void configure() throws Exception {
        from("jetty:http://0.0.0.0:8080/ignite?httpMethodRestrict=POST")
            .unmarshal().json(JsonLibrary.Jackson)
            .to("bean-validator:validate")
            .to("direct:ignite.ingest");
    }
});

// Remember our Streamer is now consuming from the Direct endpoint above.
streamer.setEndpointUri("direct:ignite.ingest");
----
--

== Setting a Response

By default, the response sent back to the caller (if it is a synchronous endpoint) is simply an echo of the original request.
If you want to customize​ the response, set a Camel `Processor` as a `responseProcessor`:

[tabs]
--
tab:Java[]
[source,java]
----
streamer.setResponseProcessor(new Processor() {
    @Override public void process(Exchange exchange) throws Exception {
        exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE, 200);
        exchange.getOut().setBody("OK");
    }
});
----
--
